/**
 * Copyright 2019 吉鼎科技.

 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.easyplatform.jms;

import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.lang.IllegalStateException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author <a href="mailto:davidchen@epclouds.com">littleDog</a> <br/>
 * @since 2.0.0 <br/>
 */
public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);

    protected final AtomicBoolean stopped = new AtomicBoolean(false);
    private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;

    protected Object connectionFactory;

    private int maximumActiveSessionPerConnection = 500;
    private int idleTimeout = 30 * 1000;
    private boolean blockIfSessionPoolIsFull = true;
    private long blockIfSessionPoolIsFullTimeout = -1L;
    private long expiryTimeout = 0l;
    private boolean createConnectionOnStartup = true;
    private boolean useAnonymousProducers = true;
    private boolean reconnectOnException = true;

    private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null);

    public void initConnectionsPool() {
        if (this.connectionsPool == null) {
            final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
            poolConfig.setJmxEnabled(false);
            this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
                    new KeyedPooledObjectFactory<ConnectionKey, ConnectionPool>() {
                        @Override
                        public PooledObject<ConnectionPool> makeObject(ConnectionKey connectionKey) throws Exception {
                            Connection delegate = createConnection(connectionKey);

                            ConnectionPool connection = createConnectionPool(delegate);
                            connection.setIdleTimeout(getIdleTimeout());
                            connection.setExpiryTimeout(getExpiryTimeout());
                            connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
                            connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
                            if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
                                connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
                            }
                            connection.setUseAnonymousProducers(isUseAnonymousProducers());
                            connection.setReconnectOnException(isReconnectOnException());

                            LOG.trace("Created new connection: {}", connection);

                            PooledConnectionFactory.this.mostRecentlyCreated.set(connection);

                            return new DefaultPooledObject<ConnectionPool>(connection);
                        }

                        @Override
                        public void destroyObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
                            ConnectionPool connection = pooledObject.getObject();
                            try {
                                LOG.trace("Destroying connection: {}", connection);
                                connection.close();
                            } catch (Exception e) {
                                LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.", e);
                            }
                        }

                        @Override
                        public boolean validateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) {
                            ConnectionPool connection = pooledObject.getObject();
                            if (connection != null && connection.expiredCheck()) {
                                LOG.trace("Connection has expired: {} and will be destroyed", connection);
                                return false;
                            }

                            return true;
                        }

                        @Override
                        public void activateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
                        }

                        @Override
                        public void passivateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
                        }

                    }, poolConfig);

            this.connectionsPool.setMaxIdlePerKey(1);
            this.connectionsPool.setLifo(false);

            this.connectionsPool.setTestOnBorrow(true);
            this.connectionsPool.setTestWhileIdle(true);
        }
    }

    /**
     * @return the currently configured ConnectionFactory used to create the pooled Connections.
     */
    public Object getConnectionFactory() {
        return connectionFactory;
    }

    /**
     * @param toUse The factory to use to create pooled Connections.
     */
    public void setConnectionFactory(final Object toUse) {
        if (toUse instanceof ConnectionFactory) {
            this.connectionFactory = toUse;
        } else {
            throw new IllegalArgumentException("connectionFactory should implement javax.jms.ConnectionFactory");
        }
    }

    @Override
    public QueueConnection createQueueConnection() throws JMSException {
        return (QueueConnection) createConnection();
    }

    @Override
    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
        return (QueueConnection) createConnection(userName, password);
    }

    @Override
    public TopicConnection createTopicConnection() throws JMSException {
        return (TopicConnection) createConnection();
    }

    @Override
    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
        return (TopicConnection) createConnection(userName, password);
    }

    @Override
    public Connection createConnection() throws JMSException {
        return createConnection(null, null);
    }

    @Override
    public synchronized Connection createConnection(String userName, String password) throws JMSException {
        if (stopped.get()) {
            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
            return null;
        }

        ConnectionPool connection = null;
        ConnectionKey key = new ConnectionKey(userName, password);
        if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
            try {
                connectionsPool.addObject(key);
                connection = mostRecentlyCreated.getAndSet(null);
                connection.incrementReferenceCount();
            } catch (Exception e) {
                throw createJmsException("Error while attempting to add new Connection to the pool", e);
            }
        } else {
            try {
                while (connection == null) {
                    connection = connectionsPool.borrowObject(key);
                    synchronized (connection) {
                        if (connection.getConnection() != null) {
                            connection.incrementReferenceCount();
                            break;
                        }
                        connectionsPool.returnObject(key, connection);
                        connection = null;
                    }
                }
            } catch (Exception e) {
                throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
            }

            try {
                connectionsPool.returnObject(key, connection);
            } catch (Exception e) {
                throw createJmsException("Error when returning connection to the pool", e);
            }
        }

        return newPooledConnection(connection);
    }

    protected Connection newPooledConnection(ConnectionPool connection) {
        return new PooledConnection(connection);
    }

    private JMSException createJmsException(String msg, Exception cause) {
        JMSException exception = new JMSException(msg);
        exception.setLinkedException(cause);
        exception.initCause(cause);
        return exception;
    }

    protected Connection createConnection(ConnectionKey key) throws JMSException {
        if (connectionFactory instanceof ConnectionFactory) {
            if (key.getUserName() == null && key.getPassword() == null) {
                return ((ConnectionFactory) connectionFactory).createConnection();
            } else {
                return ((ConnectionFactory) connectionFactory).createConnection(key.getUserName(), key.getPassword());
            }
        } else {
            throw new IllegalStateException("connectionFactory should implement javax.jms.ConnectionFactory");
        }
    }

    public void start() {
        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
        stopped.set(false);
        if (isCreateConnectionOnStartup()) {
            try {
                createConnection().close();
            } catch (JMSException e) {
                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
            }
        }
    }

    public void stop() {
        if (stopped.compareAndSet(false, true)) {
            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
                    connectionsPool != null ? connectionsPool.getNumActive() : 0);
            try {
                if (connectionsPool != null) {
                    connectionsPool.close();
                    connectionsPool = null;
                }
            } catch (Exception e) {
            }
        }
    }

    public void clear() {
        if (stopped.get()) {
            return;
        }

        getConnectionsPool().clear();
    }

    public int getMaximumActiveSessionPerConnection() {
        return maximumActiveSessionPerConnection;
    }

    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
    }

    public void setBlockIfSessionPoolIsFull(boolean block) {
        this.blockIfSessionPoolIsFull = block;
    }

    public boolean isBlockIfSessionPoolIsFull() {
        return this.blockIfSessionPoolIsFull;
    }

    public int getMaxConnections() {
        return getConnectionsPool().getMaxIdlePerKey();
    }

    public void setMaxConnections(int maxConnections) {
        getConnectionsPool().setMaxIdlePerKey(maxConnections);
        getConnectionsPool().setMaxTotalPerKey(maxConnections);
    }

    public int getIdleTimeout() {
        return idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    public void setExpiryTimeout(long expiryTimeout) {
        this.expiryTimeout = expiryTimeout;
    }

    public long getExpiryTimeout() {
        return expiryTimeout;
    }

    public boolean isCreateConnectionOnStartup() {
        return createConnectionOnStartup;
    }

    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
        this.createConnectionOnStartup = createConnectionOnStartup;
    }

    public boolean isUseAnonymousProducers() {
        return this.useAnonymousProducers;
    }

    public void setUseAnonymousProducers(boolean value) {
        this.useAnonymousProducers = value;
    }

    protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
        initConnectionsPool();
        return this.connectionsPool;
    }

    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
        getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
    }

    public long getTimeBetweenExpirationCheckMillis() {
        return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
    }

    public int getNumConnections() {
        return getConnectionsPool().getNumIdle();
    }

    protected ConnectionPool createConnectionPool(Connection connection) {
        return new ConnectionPool(connection);
    }

    public long getBlockIfSessionPoolIsFullTimeout() {
        return blockIfSessionPoolIsFullTimeout;
    }

    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
        this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
    }

    public boolean isReconnectOnException() {
        return reconnectOnException;
    }

    public void setReconnectOnException(boolean reconnectOnException) {
        this.reconnectOnException = reconnectOnException;
    }

    protected void populateProperties(Properties props) {
        props.setProperty("maximumActiveSessionPerConnection", Integer.toString(getMaximumActiveSessionPerConnection()));
        props.setProperty("maxConnections", Integer.toString(getMaxConnections()));
        props.setProperty("idleTimeout", Integer.toString(getIdleTimeout()));
        props.setProperty("expiryTimeout", Long.toString(getExpiryTimeout()));
        props.setProperty("timeBetweenExpirationCheckMillis", Long.toString(getTimeBetweenExpirationCheckMillis()));
        props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup()));
        props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers()));
        props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout()));
        props.setProperty("reconnectOnException", Boolean.toString(isReconnectOnException()));
    }
}
